package streamsql

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

// TestSQLTumblingWindow_ProcessingTime 测试处理时间的滚动窗口
// 验证不使用 WITH 子句时，滚动窗口基于处理时间（系统时钟）工作
func TestSQLTumblingWindow_ProcessingTime(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               AVG(temperature) as avg_temp,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 10)
	defer close(ch)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			select {
			case ch <- results:
			default:
				// 非阻塞发送
			}
		}
	})

	// 使用处理时间：发送数据，不包含时间戳字段
	// 滚动窗口基于数据到达的处理时间（系统时钟）来划分窗口
	for i := 0; i < 10; i++ {
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"temperature": float64(i),
		})
		time.Sleep(200 * time.Millisecond) // 每200ms发送一条数据
	}

	// 等待窗口触发（处理时间滚动窗口基于系统时钟触发）
	time.Sleep(3 * time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			row := firstWindow[0]
			cnt := row["cnt"].(float64)
			avgTemp := row["avg_temp"].(float64)
			minTemp := row["min_temp"].(float64)
			maxTemp := row["max_temp"].(float64)

			assert.Greater(t, cnt, 0.0, "窗口应该包含数据")
			assert.LessOrEqual(t, minTemp, maxTemp, "最小值应该小于等于最大值")
			assert.LessOrEqual(t, minTemp, avgTemp, "最小值应该小于等于平均值")
			assert.LessOrEqual(t, avgTemp, maxTemp, "平均值应该小于等于最大值")

			t.Logf("处理时间滚动窗口成功触发，数据量: %.0f, 平均温度: %.2f", cnt, avgTemp)
		}
	}
}

// TestSQLTumblingWindow_MaxOutOfOrderness 测试滚动窗口的最大延迟时间配置
// 验证设置 MaxOutOfOrderness 后，延迟数据能否在允许的延迟范围内被正确处理
func TestSQLTumblingWindow_MaxOutOfOrderness(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	// 使用 SQL 配置 MaxOutOfOrderness
	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', MAXOUTOFORDERNESS='1s', IDLETIMEOUT='2s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 模拟延迟数据场景
	// 场景：设置 MaxOutOfOrderness = 1秒，测试延迟数据能否在1秒内被正确处理
	// 窗口大小2秒，需要对齐到2秒的倍数
	windowSizeMs := int64(2000)                   // 2秒
	baseTimeRaw := time.Now().UnixMilli() - 10000 // 使用10秒前作为基准
	// 对齐baseTime到窗口大小的倍数，确保窗口对齐行为可预测
	baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs

	// 第一阶段：发送正常顺序的数据
	// 事件时间：0ms, 200ms, 400ms, ..., 2000ms（第一个窗口 [0ms, 2000ms)）
	t.Log("第一阶段：发送正常顺序的数据（事件时间 0-2000ms）")
	for i := 0; i < 10; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i), // 0-9
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待 watermark 推进，触发第一个窗口
	t.Log("等待 watermark 推进，触发第一个窗口")
	time.Sleep(3 * time.Second)

	// 第二阶段：发送延迟数据
	// 延迟数据的事件时间在第一个窗口内（如 500ms, 700ms, 900ms）
	// 如果 MaxOutOfOrderness = 1秒，这些数据应该能被处理
	t.Log("第二阶段：发送延迟数据（事件时间在第一个窗口内，延迟 < 1秒）")
	lateDataTimes := []int64{500, 700, 900} // 延迟数据的事件时间（相对于 baseTime）
	for i, lateTime := range lateDataTimes {
		eventTime := baseTime + lateTime
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(20 + i), // 20-22，用于标识延迟数据
		})
		time.Sleep(100 * time.Millisecond)
	}

	// 第三阶段：发送更多正常数据，推进 watermark
	t.Log("第三阶段：继续发送正常数据，推进 watermark")
	for i := 10; i < 15; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待窗口触发和延迟数据处理
	time.Sleep(3 * time.Second)

	// 收集所有窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证窗口数据
	// 如果 MaxOutOfOrderness 配置正确，延迟数据应该能被统计到对应窗口
	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			cnt := firstWindow[0]["cnt"].(float64)
			minTemp := firstWindow[0]["min_temp"].(float64)
			maxTemp := firstWindow[0]["max_temp"].(float64)

			t.Logf("第一个窗口: cnt=%.0f, min=%.0f, max=%.0f", cnt, minTemp, maxTemp)

			// 验证窗口包含数据
			// 滚动窗口：窗口大小2秒，每200ms一条数据，理论上应该有10条数据
			// 但由于窗口对齐和 watermark 机制，实际数据量可能略有不同
			assert.GreaterOrEqual(t, cnt, 3.0, "第一个窗口应该包含足够的数据（滚动窗口特性）")
			assert.Equal(t, 0.0, minTemp, "第一个窗口的最小值应该是0（正常数据）")

			// 如果 MaxOutOfOrderness 配置正确，延迟数据应该被处理
			if maxTemp >= 20.0 {
				t.Logf("✓ 延迟数据被正确处理，最大值包含延迟数据: %.0f", maxTemp)
			} else {
				t.Logf("提示：延迟数据可能未被统计，当前最大值: %.0f", maxTemp)
			}
		}
	}

	t.Logf("总共触发了 %d 个窗口", windowResultsLen)
}

// TestSQLTumblingWindow_AllowedLateness 测试滚动窗口的 AllowedLateness 配置
// 验证窗口触发后，延迟数据能否在允许的延迟时间内更新窗口结果
func TestSQLTumblingWindow_AllowedLateness(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', ALLOWEDLATENESS='1s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 模拟 AllowedLateness 场景
	// 场景：窗口触发后，发送延迟数据，验证窗口能否更新
	baseTime := time.Now().UnixMilli() - 10000 // 使用10秒前作为基准

	// 第一阶段：发送正常顺序的数据，触发第一个窗口
	// 事件时间：0ms, 200ms, 400ms, ..., 2000ms（第一个窗口 [0ms, 2000ms)）
	t.Log("第一阶段：发送正常顺序的数据（事件时间 0-2000ms）")
	for i := 0; i < 10; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i), // 0-9
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待 watermark 推进，触发第一个窗口
	t.Log("等待 watermark 推进，触发第一个窗口")
	time.Sleep(3 * time.Second)

	// 第二阶段：发送延迟数据（事件时间在第一个窗口内）
	// 这些数据应该在 AllowedLateness = 1秒 内被处理
	t.Log("第二阶段：发送延迟数据（事件时间在第一个窗口内）")
	lateDataTimes := []int64{300, 600, 900} // 延迟数据的事件时间
	for i, lateTime := range lateDataTimes {
		eventTime := baseTime + lateTime
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(30 + i), // 30-32，用于标识延迟数据
		})
		time.Sleep(100 * time.Millisecond)
	}

	// 第三阶段：继续发送正常数据，推进 watermark
	t.Log("第三阶段：继续发送正常数据，推进 watermark")
	for i := 10; i < 15; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待窗口触发和延迟数据处理
	time.Sleep(3 * time.Second)

	// 收集所有窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证窗口数据
	// 如果 AllowedLateness 配置正确，延迟数据应该能触发窗口的延迟更新
	if windowResultsLen > 0 {
		// 滚动窗口的延迟更新可能体现在后续的窗口结果中
		// 检查所有窗口结果，看是否有包含延迟数据的窗口
		hasLateData := false
		for i, window := range windowResultsCopy {
			if len(window) > 0 {
				cnt := window[0]["cnt"].(float64)
				minTemp := window[0]["min_temp"].(float64)
				maxTemp := window[0]["max_temp"].(float64)

				t.Logf("窗口 %d: cnt=%.0f, min=%.0f, max=%.0f", i+1, cnt, minTemp, maxTemp)

				// 验证窗口包含数据
				assert.GreaterOrEqual(t, cnt, 1.0, "窗口 %d 应该包含数据", i+1)

				// 如果 AllowedLateness 配置正确，延迟数据应该被处理
				// 延迟数据（temperature=30-32）应该能被统计
				if maxTemp >= 30.0 {
					hasLateData = true
					t.Logf("✓ 窗口 %d 包含延迟数据，最大值: %.0f", i+1, maxTemp)
				}
			}
		}

		// 验证是否有延迟更新（窗口可能触发多次）
		if windowResultsLen > 1 {
			t.Logf("✓ 滚动窗口触发了 %d 次，可能包含延迟更新", windowResultsLen)
		}

		if !hasLateData {
			t.Logf("提示：延迟数据可能未被统计，或延迟数据的时间不在窗口范围内")
		}
	}

	t.Logf("总共触发了 %d 个窗口", windowResultsLen)
}

// TestSQLTumblingWindow_BothConfigs 测试滚动窗口同时配置 MaxOutOfOrderness 和 AllowedLateness
// 验证两个配置组合使用时，延迟数据能否被正确处理
func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
	// 启用调试日志（可选，用于排查问题）
	// window.EnableDebug = true

	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (
            TIMESTAMP='eventTime',
            TIMEUNIT='ms',
            MAXOUTOFORDERNESS='1s',
            ALLOWEDLATENESS='500ms'
        )
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 模拟完整的延迟数据处理场景
	// 关键：确保 baseTime 对齐到窗口边界，以便窗口对齐行为可预测
	windowSizeMs := int64(2000) // 2秒
	baseTimeRaw := time.Now().UnixMilli() - 10000
	baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
	maxOutOfOrdernessMs := int64(1000)                      // 1秒
	firstWindowEnd := baseTime + windowSizeMs
	// 关键：要触发窗口，需要 watermark >= windowEnd
	// watermark = maxEventTime - maxOutOfOrderness
	// 所以需要：maxEventTime - maxOutOfOrderness >= windowEnd
	// 即：maxEventTime >= windowEnd + maxOutOfOrderness
	requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs

	// 第一阶段：发送正常顺序的数据
	t.Log("第一阶段：发送正常顺序的数据")
	for i := 0; i < 10; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i), // 0-9
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待 watermark 推进（考虑 MaxOutOfOrderness = 1s）
	t.Log("等待 watermark 推进，触发窗口（MaxOutOfOrderness = 1s）")
	time.Sleep(3 * time.Second)

	// 第二阶段：发送延迟数据（事件时间在第一个窗口内）
	// MaxOutOfOrderness = 1s：这些数据应该在允许的乱序范围内
	// AllowedLateness = 500ms：窗口触发后还能接受500ms的延迟数据
	t.Log("第二阶段：发送延迟数据（事件时间在第一个窗口内）")
	lateDataTimes := []int64{400, 800, 1200} // 延迟数据的事件时间
	for i, lateTime := range lateDataTimes {
		eventTime := baseTime + lateTime
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(40 + i), // 40-42，用于标识延迟数据
		})
		time.Sleep(100 * time.Millisecond)
	}

	// 第三阶段：继续发送正常数据，推进 watermark
	// 关键：必须发送事件时间 >= requiredEventTimeForTrigger 的数据，才能让 watermark >= windowEnd
	t.Log("第三阶段：继续发送正常数据，推进 watermark")
	for i := 10; i < 15; i++ {
		eventTime := baseTime + int64(i*200)
		// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
		if i == 10 && eventTime < requiredEventTimeForTrigger {
			eventTime = requiredEventTimeForTrigger
		}
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待窗口触发和延迟数据处理
	time.Sleep(3 * time.Second)

	// 收集所有窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证窗口数据
	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			cnt := firstWindow[0]["cnt"].(float64)
			minTemp := firstWindow[0]["min_temp"].(float64)
			maxTemp := firstWindow[0]["max_temp"].(float64)

			t.Logf("第一个窗口: cnt=%.0f, min=%.0f, max=%.0f", cnt, minTemp, maxTemp)

			// 验证窗口包含数据
			// 滚动窗口：窗口大小2秒，每200ms一条数据，理论上应该有10条数据
			// 但由于窗口对齐和 watermark 机制，实际数据量可能略有不同
			assert.GreaterOrEqual(t, cnt, 3.0, "第一个窗口应该包含足够的数据（滚动窗口特性）")
			assert.Equal(t, 0.0, minTemp, "第一个窗口的最小值应该是0（正常数据）")

			// 验证延迟数据是否被处理
			// 如果配置正确，maxTemp 可能包含延迟数据的值（40-42）
			if maxTemp >= 40.0 {
				t.Logf("✓ 延迟数据被正确处理，最大值包含延迟数据: %.0f", maxTemp)
			} else {
				t.Logf("提示：延迟数据可能未被统计，当前最大值: %.0f", maxTemp)
			}
		}

		// 验证是否有延迟更新
		if windowResultsLen > 1 {
			t.Logf("✓ 滚动窗口触发了 %d 次，可能包含延迟更新", windowResultsLen)

			// 验证后续窗口的数据
			for i := 1; i < windowResultsLen && i < 3; i++ {
				if len(windowResultsCopy[i]) > 0 {
					cnt := windowResultsCopy[i][0]["cnt"].(float64)
					t.Logf("窗口 %d: cnt=%.0f", i+1, cnt)
				}
			}
		}
	}

	t.Logf("总共触发了 %d 个窗口", windowResultsLen)
	t.Logf("配置验证：MaxOutOfOrderness=1s, AllowedLateness=500ms")
}

// TestSQLTumblingWindow_LateDataHandling 测试滚动窗口的延迟数据处理
// 验证即使数据延迟到达，只要在允许的延迟范围内，也能正确统计到对应窗口
func TestSQLTumblingWindow_LateDataHandling(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 使用事件时间：模拟延迟数据场景
	// 场景：先发送正常顺序的数据，然后发送一些延迟的数据
	// 窗口大小2秒，需要对齐到2秒的倍数
	windowSizeMs := int64(2000)                  // 2秒
	baseTimeRaw := time.Now().UnixMilli() - 5000 // 使用5秒前作为基准
	// 对齐baseTime到窗口大小的倍数，确保窗口对齐行为可预测
	baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs

	// 第一阶段：发送正常顺序的数据（事件时间：0ms, 200ms, 400ms, ..., 2000ms）
	// 这些数据应该被统计到第一个窗口 [0ms, 2000ms)
	t.Log("第一阶段：发送正常顺序的数据")
	for i := 0; i < 10; i++ {
		eventTime := baseTime + int64(i*200) // 每200ms一条数据
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i), // 温度值 0-9
		})
		time.Sleep(50 * time.Millisecond) // 处理时间间隔较小
	}

	// 等待 watermark 推进，让第一个窗口触发
	// 窗口大小2秒，第一个窗口应该在 watermark >= baseTime + 2000ms 时触发
	t.Log("等待 watermark 推进，触发第一个窗口")
	time.Sleep(3 * time.Second)

	// 第二阶段：发送延迟的数据
	// 这些数据的事件时间比之前的数据早，但应该在允许的延迟范围内
	// 延迟数据的事件时间：100ms, 300ms, 500ms（这些时间在第一个窗口 [0ms, 2000ms) 内）
	t.Log("第二阶段：发送延迟数据（事件时间在第一个窗口内）")
	for i := 0; i < 3; i++ {
		// 延迟数据：事件时间比正常数据早，但仍在窗口范围内
		eventTime := baseTime + int64(100+i*200) // 100ms, 300ms, 500ms
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(10 + i), // 温度值 10-12，用于区分延迟数据
		})
		time.Sleep(100 * time.Millisecond)
	}

	// 继续发送更多正常数据，推进 watermark
	t.Log("第三阶段：继续发送正常数据，推进 watermark")
	for i := 10; i < 15; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待窗口触发和延迟数据处理
	time.Sleep(3 * time.Second)

	// 收集所有窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证第一个窗口的数据
	// 第一个窗口应该包含正常数据（0-9）和可能的延迟数据
	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			cnt := firstWindow[0]["cnt"].(float64)
			minTemp := firstWindow[0]["min_temp"].(float64)
			maxTemp := firstWindow[0]["max_temp"].(float64)

			t.Logf("第一个窗口: cnt=%.0f, min=%.0f, max=%.0f", cnt, minTemp, maxTemp)

			// 第一个窗口应该包含正常数据
			// 滚动窗口：窗口大小2秒，每200ms一条数据，理论上应该有10条数据
			// 但由于窗口对齐和 watermark 机制，实际数据量可能略有不同
			assert.GreaterOrEqual(t, cnt, 3.0, "第一个窗口应该包含足够的数据（滚动窗口特性）")
			assert.Equal(t, 0.0, minTemp, "第一个窗口的最小值应该是0（正常数据）")
			assert.GreaterOrEqual(t, maxTemp, 0.0, "第一个窗口的最大值应该大于等于0")
		}
	}

	// 验证延迟数据是否被处理
	// 如果延迟数据被正确处理，应该能在后续窗口或更新中看到
	t.Logf("总共触发了 %d 个窗口", windowResultsLen)
}

// TestSQLTumblingWindow_EventTimeWindowAlignment 测试事件时间窗口对齐到epoch
func TestSQLTumblingWindow_EventTimeWindowAlignment(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               window_start() as start,
               window_end() as end
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 使用事件时间：发送数据，验证窗口对齐到epoch
	// 窗口大小2秒，应该对齐到2秒的倍数
	baseTime := time.Now().UnixMilli()

	// 发送数据，事件时间从baseTime开始，每200ms一条
	// 第一个窗口应该对齐到小于等于baseTime的最大2秒倍数
	for i := 0; i < 15; i++ {
		eventTime := baseTime + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 等待窗口触发
	time.Sleep(3 * time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证窗口对齐
	windowSizeMs := int64(2000) // 2秒 = 2000毫秒
	for i, window := range windowResultsCopy {
		if len(window) > 0 {
			row := window[0]
			start := row["start"].(int64)
			end := row["end"].(int64)

			startMs := start / int64(time.Millisecond)
			endMs := end / int64(time.Millisecond)
			windowSizeNs := int64(windowSizeMs) * int64(time.Millisecond)

			assert.Equal(t, windowSizeNs, end-start,
				"窗口 %d 的大小应该是2秒（2000ms），实际: start=%d, end=%d", i+1, start, end)

			assert.Equal(t, int64(0), startMs%windowSizeMs,
				"窗口 %d 的开始时间应该对齐到2秒的倍数（epoch对齐），实际: startMs=%d", i+1, startMs)

			if i > 0 {
				prevEndMs := windowResultsCopy[i-1][0]["end"].(int64) / int64(time.Millisecond)
				assert.Equal(t, prevEndMs, startMs,
					"窗口 %d 的开始时间应该等于前一个窗口的结束时间，prevEndMs=%d, startMs=%d", i+1, prevEndMs, startMs)
			}

			t.Logf("窗口 %d: start=%d, end=%d, size=%dms", i+1, startMs, endMs, endMs-startMs)
		}
	}
}

// TestSQLTumblingWindow_WatermarkTriggerTiming 测试Watermark触发窗口的时机
func TestSQLTumblingWindow_WatermarkTriggerTiming(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               window_start() as start,
               window_end() as end
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', MAXOUTOFORDERNESS='500ms', IDLETIMEOUT='2s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 使用事件时间：发送数据，验证watermark触发时机
	baseTime := time.Now().UnixMilli() - 10000 // 使用10秒前作为基准
	maxOutOfOrdernessMs := int64(500)          // 500ms

	// 第一阶段：发送数据到第一个窗口 [alignedStart, alignedStart+2000)
	// 计算对齐后的窗口开始时间
	windowSizeMs := int64(2000)
	alignedStart := (baseTime / windowSizeMs) * windowSizeMs
	firstWindowEnd := alignedStart + windowSizeMs

	t.Logf("第一个窗口: [%d, %d)", alignedStart, firstWindowEnd)

	// 发送数据，事件时间在第一个窗口内
	for i := 0; i < 10; i++ {
		eventTime := alignedStart + int64(i*200) // 在窗口内
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 发送一个事件时间刚好等于window_end的数据，推进watermark
	// watermark = maxEventTime - maxOutOfOrderness = firstWindowEnd - 500
	// 此时 watermark < firstWindowEnd，窗口不应该触发
	ssql.Emit(map[string]interface{}{
		"deviceId":    "sensor001",
		"eventTime":   firstWindowEnd,
		"temperature": 100.0,
	})

	// 等待watermark更新（watermark更新间隔200ms）
	time.Sleep(500 * time.Millisecond)

	// 发送一个事件时间超过window_end的数据，推进watermark
	// watermark = maxEventTime - maxOutOfOrderness = (firstWindowEnd + 1000) - 500 = firstWindowEnd + 500
	// 此时 watermark >= firstWindowEnd，窗口应该触发
	ssql.Emit(map[string]interface{}{
		"deviceId":    "sensor001",
		"eventTime":   firstWindowEnd + 1000,
		"temperature": 200.0,
	})

	// 等待窗口触发
	time.Sleep(1 * time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证第一个窗口的触发时机
	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			row := firstWindow[0]
			start := row["start"].(int64)
			end := row["end"].(int64)

			startMs := start / int64(time.Millisecond)
			endMs := end / int64(time.Millisecond)

			assert.Equal(t, alignedStart, startMs,
				"第一个窗口的开始时间应该对齐到epoch，expected=%d, actual=%d", alignedStart, startMs)
			assert.Equal(t, firstWindowEnd, endMs,
				"第一个窗口的结束时间应该正确，expected=%d, actual=%d", firstWindowEnd, endMs)

			// 验证窗口在watermark >= window_end时触发
			// 由于watermark = maxEventTime - maxOutOfOrderness
			// 当maxEventTime = firstWindowEnd + 1000时，watermark = firstWindowEnd + 500
			// watermark >= firstWindowEnd，窗口应该触发
			t.Logf("✓ 窗口在watermark >= window_end时正确触发")
			t.Logf("窗口: [%d, %d), 触发时maxEventTime >= %d", start, end, end+maxOutOfOrdernessMs)
		}
	}
}

// TestSQLTumblingWindow_AllowedLatenessUpdate 测试AllowedLateness的延迟更新
func TestSQLTumblingWindow_AllowedLatenessUpdate(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               MIN(temperature) as min_temp,
               MAX(temperature) as max_temp,
               window_start() as start,
               window_end() as end
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', MAXOUTOFORDERNESS='500ms', ALLOWEDLATENESS='1s', IDLETIMEOUT='2s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	baseTime := time.Now().UnixMilli() - 10000
	windowSizeMs := int64(2000)
	alignedStart := (baseTime / windowSizeMs) * windowSizeMs
	firstWindowEnd := alignedStart + windowSizeMs
	allowedLatenessMs := int64(1000) // 1秒

	// 第一阶段：发送正常数据，触发第一个窗口
	t.Log("第一阶段：发送正常数据，触发第一个窗口")
	for i := 0; i < 10; i++ {
		eventTime := alignedStart + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i), // 0-9
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 推进watermark，触发第一个窗口
	ssql.Emit(map[string]interface{}{
		"deviceId":    "sensor001",
		"eventTime":   firstWindowEnd + 1000,
		"temperature": 100.0,
	})

	// 等待第一个窗口触发
	time.Sleep(1 * time.Second)

	// 收集第一个窗口的结果
	firstWindowReceived := false
	firstWindowCnt := 0.0
	firstWindowMax := 0.0
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 10
	iteration := 0

	for !firstWindowReceived && iteration < maxIterations {
		select {
		case res, ok := <-ch:
			if !ok {
				// channel 已关闭
				t.Fatal("应该收到第一个窗口")
			}
			if len(res) > 0 {
				firstWindowReceived = true
				firstWindowCnt = res[0]["cnt"].(float64)
				firstWindowMax = res[0]["max_temp"].(float64)
				t.Logf("第一个窗口（初始）: cnt=%.0f, max=%.0f", firstWindowCnt, firstWindowMax)
			}
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果
			iteration++
		case <-ctx.Done():
			t.Fatal("应该收到第一个窗口")
		}
	}

	// 第二阶段：发送延迟数据（事件时间在第一个窗口内，但在AllowedLateness范围内）
	t.Log("第二阶段：发送延迟数据（事件时间在第一个窗口内）")
	lateDataTimes := []int64{300, 600, 900} // 延迟数据的事件时间（相对于alignedStart）
	lateDataTemps := []float64{30.0, 31.0, 32.0}
	for i, lateTime := range lateDataTimes {
		eventTime := alignedStart + lateTime
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": lateDataTemps[i],
		})
		time.Sleep(100 * time.Millisecond)
	}

	// 继续发送正常数据，推进watermark（但不超过window_end + allowedLateness）
	ssql.Emit(map[string]interface{}{
		"deviceId":    "sensor001",
		"eventTime":   firstWindowEnd + allowedLatenessMs - 100, // 在allowedLateness范围内
		"temperature": 200.0,
	})

	// 等待延迟更新
	time.Sleep(1 * time.Second)

	// 收集所有窗口结果
	ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel2()
	maxIterations2 := 20
	iteration2 := 0

	for iteration2 < maxIterations2 {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration2++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx2.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")

	// 验证延迟更新
	hasLateUpdate := false
	for i, window := range windowResultsCopy {
		if len(window) > 0 {
			row := window[0]
			start := row["start"].(int64)
			end := row["end"].(int64)
			cnt := row["cnt"].(float64)
			maxTemp := row["max_temp"].(float64)

			startMs := start / int64(time.Millisecond)
			endMs := end / int64(time.Millisecond)

			if startMs == alignedStart && endMs == firstWindowEnd {
				if cnt > firstWindowCnt {
					hasLateUpdate = true
					t.Logf("✓ 窗口延迟更新: cnt从%.0f增加到%.0f, max从%.0f增加到%.0f",
						firstWindowCnt, cnt, firstWindowMax, maxTemp)

					// 验证延迟数据被包含
					assert.GreaterOrEqual(t, maxTemp, 30.0,
						"延迟更新应该包含延迟数据，maxTemp应该>=30.0，实际: %.0f", maxTemp)
				}
			}

			t.Logf("窗口 %d: [%d, %d), cnt=%.0f, max=%.0f", i+1, start, end, cnt, maxTemp)
		}
	}

	if !hasLateUpdate {
		t.Logf("⚠ 提示：未检测到延迟更新，可能延迟数据未被处理或窗口已关闭")
	} else {
		t.Logf("✓ AllowedLateness功能正常工作，延迟数据触发窗口更新")
	}
}

// TestSQLTumblingWindow_IdleSourceMechanism 测试Idle Source机制
// 验证当数据源空闲时，watermark基于处理时间推进，窗口能够正常关闭
func TestSQLTumblingWindow_IdleSourceMechanism(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               window_start() as start,
               window_end() as end
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', MAXOUTOFORDERNESS='500ms', IDLETIMEOUT='2s')
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	// 使用事件时间：发送数据，然后停止发送，验证窗口能够关闭
	baseTime := time.Now().UnixMilli() - 10000
	windowSizeMs := int64(2000) // 2秒

	// 计算对齐后的第一个窗口开始时间
	alignedStart := (baseTime / windowSizeMs) * windowSizeMs
	firstWindowEnd := alignedStart + windowSizeMs

	t.Logf("第一个窗口: [%d, %d)", alignedStart, firstWindowEnd)

	// 第一阶段：发送数据，创建窗口
	t.Log("第一阶段：发送数据，创建窗口")
	for i := 0; i < 5; i++ {
		eventTime := alignedStart + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 第二阶段：停止发送数据，等待Idle Source机制触发
	// IdleTimeout = 2秒，意味着2秒无数据后，watermark会基于处理时间推进
	t.Log("第二阶段：停止发送数据，等待Idle Source机制触发（IdleTimeout=2s）")
	time.Sleep(3 * time.Second) // 等待超过IdleTimeout，确保watermark推进

	// 收集窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
	copy(windowResultsCopy, windowResults)
	windowResultsMu.Unlock()

	// 验证窗口能够关闭（即使没有新数据）
	require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口（即使数据源空闲）")

	// 验证窗口数据
	if windowResultsLen > 0 {
		firstWindow := windowResultsCopy[0]
		if len(firstWindow) > 0 {
			row := firstWindow[0]
			start := row["start"].(int64)
			end := row["end"].(int64)
			cnt := row["cnt"].(float64)

			// 验证窗口边界正确
			// window_start() 和 window_end() 返回纳秒，需要转换为毫秒
			startMs := start / int64(time.Millisecond)
			endMs := end / int64(time.Millisecond)
			assert.Equal(t, alignedStart, startMs,
				"第一个窗口的开始时间应该对齐到窗口大小，expected=%d, actual=%d", alignedStart, startMs)
			assert.Equal(t, firstWindowEnd, endMs,
				"第一个窗口的结束时间应该正确，expected=%d, actual=%d", firstWindowEnd, endMs)

			// 验证窗口包含数据
			assert.Greater(t, cnt, 0.0, "窗口应该包含数据")

			t.Logf("✓ Idle Source机制正常工作，窗口在数据源空闲时能够关闭")
			t.Logf("窗口: [%d, %d), cnt=%.0f", start, end, cnt)
		}
	}
}

// TestSQLTumblingWindow_IdleSourceDisabled 测试Idle Source机制未启用的情况
// 验证当IdleTimeout=0（禁用）时，如果数据源空闲，窗口无法关闭
func TestSQLTumblingWindow_IdleSourceDisabled(t *testing.T) {
	ssql := New()
	defer ssql.Stop()

	sql := `
        SELECT deviceId,
               COUNT(*) as cnt,
               window_start() as start,
               window_end() as end
        FROM stream
        GROUP BY deviceId, TumblingWindow('2s')
        WITH (TIMESTAMP='eventTime', TIMEUNIT='ms', MAXOUTOFORDERNESS='500ms', IDLETIMEOUT='2s')
        -- 注意：没有配置IDLETIMEOUT，默认为0（禁用）
    `
	err := ssql.Execute(sql)
	require.NoError(t, err)

	ch := make(chan []map[string]interface{}, 20)
	windowResults := make([][]map[string]interface{}, 0)
	var windowResultsMu sync.Mutex
	ssql.AddSink(func(results []map[string]interface{}) {
		if len(results) > 0 {
			windowResultsMu.Lock()
			windowResults = append(windowResults, results)
			windowResultsMu.Unlock()
			ch <- results
		}
	})

	baseTime := time.Now().UnixMilli() - 10000
	windowSizeMs := int64(2000)
	alignedStart := (baseTime / windowSizeMs) * windowSizeMs

	// 发送数据，但事件时间不足以触发窗口
	t.Log("发送数据，但事件时间不足以触发窗口")
	for i := 0; i < 3; i++ {
		eventTime := alignedStart + int64(i*200)
		ssql.Emit(map[string]interface{}{
			"deviceId":    "sensor001",
			"eventTime":   eventTime,
			"temperature": float64(i),
		})
		time.Sleep(50 * time.Millisecond)
	}

	// 停止发送数据，等待一段时间
	// 由于IdleTimeout未启用，watermark不会基于处理时间推进
	t.Log("停止发送数据，等待（IdleTimeout未启用）")
	time.Sleep(3 * time.Second)

	// 收集窗口结果
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	maxIterations := 20
	iteration := 0

	for iteration < maxIterations {
		select {
		case result, ok := <-ch:
			if !ok {
				// channel 已关闭
				goto END
			}
			_ = result // 使用结果
			iteration++
		case <-time.After(500 * time.Millisecond):
			// 500ms 没有新结果，退出
			goto END
		case <-ctx.Done():
			// 超时退出
			goto END
		}
	}

END:
	windowResultsMu.Lock()
	windowResultsLen := len(windowResults)
	windowResultsMu.Unlock()

	// 注意：这个测试可能无法完全验证窗口无法关闭
	// 因为如果watermark已经推进到足够的位置，窗口可能已经触发
	// 这个测试主要用于对比：启用Idle Source vs 未启用Idle Source
	t.Logf("窗口结果数量: %d（IdleTimeout未启用）", windowResultsLen)
}
